Add StreamTimeoutStrategy, StreamTimeoutDecision, and Token-bucket-based timeout limiting strategy #6245
+310
−55
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
StreamMessage.timeout()currently uses a fixed timeout mode. Differentuse-cases may require custom logic, so we introduce a
StreamTimeoutStrategyinterface. The existing
StreamTimeoutModewill remain as the defaultstrategy.
To tolerate transient network jitter, a token-bucket strategy can delay
timeout until a user-defined number of consecutive delays is reached.
Modifications
StreamTimeoutDecisiontimedOut,hasNextSchedule,nextScheduleTimeNanos.TIMED_OUT,NO_SCHEDULE,scheduleAt(long).StreamTimeoutStrategyinterfaceinitialDecision(long subscribeTimeNanos)– called once right aftersubscription.
evaluateTimeout(long currentTimeNanos, long lastEventTimeNanos)– calledeach time the scheduled timer fires.
StreamTimeoutMode) and token-bucket strategy implementationsWill be added in a follow-up commit.
Result
StreamTimeoutStrategytoStreamMessage.timeout(strategy)to control per-stream timeout behaviour.StreamTimeoutModecontinues to work as the default.